import logging
import string
import random
-import threading
-import queue
import datetime
import math
import collections
import signal
-import gc
from siridb.connector import SiriDBClient
return 10**(['s', 'ms', 'us', 'ns'].index(res['data'][0]['value'])*3)
-def queue_data(q, args, ts_factor):
+def queue_data(args, ts_factor):
r = random.Random()
r.seed(time.time() if args.seed is None else args.seed)
n = args.num_batches
while n and stop is False:
data = Series.get_data(args)
- q.put(data)
+ yield data
n -= 1
- q.put(None)
+ yield None
async def siridb_insert(siri, data, task_counter):
int(total_processed // (time.time() - start_time))))
finally:
task_counter.pop()
- # a = sys.getrefcount(data)
- # logging.info('ref: {}'.format(a))
- # gc.collect()
-async def dump_data(siri, q, args):
+async def dump_data(siri, args):
task_counter = []
try:
await siri.connect()
ts_factor = await get_ts_factor(siri)
- t = threading.Thread(target=queue_data, args=(q, args, ts_factor))
- t.start()
-
+ q = queue_data(args, ts_factor)
while True:
- data = q.get()
+ data = next(q)
if data is None:
break
await asyncio.sleep(0.2)
asyncio.ensure_future(siridb_insert(siri, data, task_counter))
+
# sleep 0 so the async loop will run to pick-up tasks
await asyncio.sleep(0)
exit('invalid date: {}'.format(args.start_date))
setup_logger(args)
- q = queue.Queue(maxsize=args.max_parallel)
signal.signal(signal.SIGINT, signal_handler)
siri = SiriDBClient(